module Remote.Compute (
remote,
- Interface,
ComputeState(..),
setComputeState,
getComputeStates,
- InterfaceEnv,
- interfaceEnv,
getComputeProgram,
runComputeProgram,
) where
import qualified Utility.SimpleProtocol as Proto
import Network.HTTP.Types.URI
-import Control.Concurrent.STM
import Data.Time.Clock
-import Data.Either
import Text.Read
import qualified Data.Map as M
import qualified Data.Set as S
gen r u rc gc rs = case getComputeProgram rc of
Left _err -> return Nothing
Right program -> do
- interface <- liftIO $ newTMVarIO Nothing
c <- parsedRemoteConfig remote rc
cst <- remoteCost gc c veryExpensiveRemoteCost
- return $ Just $ mk program interface c cst
+ return $ Just $ mk program c cst
where
- mk program interface c cst = Remote
+ mk program c cst = Remote
{ uuid = u
, cost = cst
, name = Git.repoDescribe r
, storeKey = storeKeyUnsupported
- , retrieveKeyFile = computeKey rs program interface
+ , retrieveKeyFile = computeKey rs program
, retrieveKeyFileInOrder = pure True
, retrieveKeyFileCheap = Nothing
, retrievalSecurityPolicy = RetrievalAllKeysSecure
, removeKey = dropKey rs
, lockContent = Nothing
- , checkPresent = checkKey rs program interface
+ , checkPresent = checkKey rs
, checkPresentCheap = False
, exportActions = exportUnsupported
, importActions = importUnsupported
gitConfigSpecialRemote u c [("compute", "true")]
return (c, u)
--- The RemoteConfig is allowed to contain fields from the program's
--- interface. That provides defaults for git-annex addcomputed.
computeConfigParser :: RemoteConfig -> Annex RemoteConfigParser
-computeConfigParser rc = do
- Interface interface <- case getComputeProgram rc of
- Left _ -> pure $ Interface []
- Right program -> liftIO (getInterface program) >>= return . \case
- Left _ -> Interface []
- Right interface -> interface
- let m = M.fromList $ mapMaybe collectfields interface
- let ininterface f = M.member (Field (fromProposedAccepted f)) m
- return $ RemoteConfigParser
- { remoteConfigFieldParsers =
- [ optionalStringParser programField
- (FieldDesc $ "compute program (must start with \"" ++ safetyPrefix ++ "\")")
- ]
- , remoteConfigRestPassthrough = Just
- ( ininterface
- , M.toList $ M.mapKeys fromField m
- )
- }
- where
- collectfields (InterfaceInput f d) = Just (f, FieldDesc d)
- collectfields (InterfaceOptionalInput f d) = Just (f, FieldDesc d)
- collectfields (InterfaceValue f d) = Just (f, FieldDesc d)
- collectfields (InterfaceOptionalValue f d) = Just (f, FieldDesc d)
- collectfields _ = Nothing
+computeConfigParser _ = return $ RemoteConfigParser
+ { remoteConfigFieldParsers =
+ [ optionalStringParser programField
+ (FieldDesc $ "compute program (must start with \"" ++ safetyPrefix ++ "\")")
+ ]
+ -- Pass through all other params, which git-annex addcomputed adds
+ -- to the input params.
+ , remoteConfigRestPassthrough = Just
+ ( const True
+ , []
+ )
+ }
newtype ComputeProgram = ComputeProgram String
deriving (Show)
programField :: RemoteConfigField
programField = Accepted "program"
-type Description = String
-
-newtype Field = Field { fromField :: String }
- deriving (Show, Eq, Ord)
-
-data InterfaceItem
- = InterfaceInput Field Description
- | InterfaceOptionalInput Field Description
- | InterfaceValue Field Description
- | InterfaceOptionalValue Field Description
- | InterfaceOutput Field Description
- | InterfaceReproducible
- deriving (Show, Eq)
-
--- List order matters, because when displaying the interface to the
--- user, need to display it in the same order as the program
--- does.
-data Interface = Interface [InterfaceItem]
- deriving (Show, Eq)
-
-instance Proto.Receivable InterfaceItem where
- parseCommand "INPUT" = Proto.parse2 InterfaceInput
- parseCommand "INPUT?" = Proto.parse2 InterfaceOptionalInput
- parseCommand "VALUE" = Proto.parse2 InterfaceValue
- parseCommand "VALUE?" = Proto.parse2 InterfaceOptionalValue
- parseCommand "OUTPUT" = Proto.parse2 InterfaceOutput
- parseCommand "REPRODUCIBLE" = Proto.parse0 InterfaceReproducible
- parseCommand _ = Proto.parseFail
-
-data ProcessOutput
- = Computing Field FilePath
- | Progress PercentFloat
+data ProcessCommand
+ = ProcessInput FilePath
+ | ProcessOutput FilePath
+ | ProcessReproducible
+ | ProcessProgress PercentFloat
deriving (Show, Eq)
-instance Proto.Receivable ProcessOutput where
- parseCommand "COMPUTING" = Proto.parse2 Computing
- parseCommand "PROGRESS" = Proto.parse1 Progress
+instance Proto.Receivable ProcessCommand where
+ parseCommand "INPUT" = Proto.parse1 ProcessInput
+ parseCommand "OUTPUT" = Proto.parse1 ProcessOutput
+ parseCommand "REPRODUCIBLE" = Proto.parse0 ProcessReproducible
+ parseCommand "PROGRESS" = Proto.parse1 ProcessProgress
parseCommand _ = Proto.parseFail
-instance Proto.Serializable Field where
- serialize = fromField
- deserialize = Just . Field
-
newtype PercentFloat = PercentFloat Float
deriving (Show, Eq)
serialize (PercentFloat p) = show p
deserialize s = PercentFloat <$> readMaybe s
-getInterfaceCached :: ComputeProgram -> TMVar (Maybe Interface) -> IO (Either String Interface)
-getInterfaceCached program iv =
- atomically (takeTMVar iv) >>= \case
- Nothing -> getInterface program >>= \case
- Left err -> do
- atomically $ putTMVar iv Nothing
- return (Left err)
- Right interface -> ret interface
- Just interface -> ret interface
- where
- ret interface = do
- atomically $ putTMVar iv (Just interface)
- return (Right interface)
-
-getInterface :: ComputeProgram -> IO (Either String Interface)
-getInterface (ComputeProgram program) =
- catchMaybeIO (readProcess program ["interface"]) >>= \case
- Nothing -> return $ Left $ "Failed to run " ++ program
- Just output -> return $ case parseInterface output of
- Right i -> Right i
- Left err -> Left $ program ++ " interface output problem: " ++ err
-
-parseInterface :: String -> Either String Interface
-parseInterface = go [] . lines
- where
- go is []
- | null is = Left "empty interface output"
- | otherwise = Right (Interface (reverse is))
- go is (l:ls)
- | null l = go is ls
- | otherwise = case Proto.parseMessage l of
- Just i -> go (i:is) ls
- Nothing -> Left $ "Unable to parse line: \"" ++ l ++ "\""
-
-data ComputeInput = ComputeInput Key FilePath
- deriving (Show, Eq)
-
-data ComputeValue = ComputeValue String
- deriving (Show, Eq)
-
-data ComputeOutput = ComputeOutput Key
- deriving (Show, Eq)
-
data ComputeState = ComputeState
- { computeInputs :: M.Map Field ComputeInput
- , computeValues :: M.Map Field ComputeValue
- , computeOutputs :: M.Map Field ComputeOutput
+ { computeParams :: [String]
+ , computeInputs :: M.Map FilePath Key
+ , computeOutputs :: M.Map FilePath (Maybe Key)
+ , computeReproducible :: Bool
}
deriving (Show, Eq)
{- Formats a ComputeState as an URL query string.
-
- - Prefixes fields with "k" and "f" for computeInputs, with
- - "v" for computeValues and "o" for computeOutputs.
+ - Prefixes computeParams with 'p', computeInputs with 'i',
+ - and computeOutput with 'o'.
-
- When the passed Key is an output, rather than duplicate it
- in the query string, that output has no value.
-
- - Fields in the query string are sorted. This is in order to ensure
- - that the same ComputeState is always formatted the same way.
+ - Example: "psomefile&pdestfile&pbaz&isomefile=WORM--foo&odestfile="
-
- - Example: "ffoo=somefile&kfoo=WORM--foo&oresult&vbar=11"
+ - The computeParams are in the order they were given. The computeInputs
+ - and computeOutputs are sorted in ascending order for stability.
-}
formatComputeState :: Key -> ComputeState -> B.ByteString
-formatComputeState k st = renderQuery False $ sortOn fst $ concat
- [ concatMap formatinput $ M.toList (computeInputs st)
- , map formatvalue $ M.toList (computeValues st)
- , map formatoutput $ M.toList (computeOutputs st)
+formatComputeState k st = renderQuery False $ concat
+ [ map formatparam (computeParams st)
+ , map formatinput (M.toAscList (computeInputs st))
+ , mapMaybe formatoutput (M.toAscList (computeOutputs st))
]
where
- formatinput (f, ComputeInput key file) =
- [ ("k" <> fb, Just (serializeKey' key))
- , ("f" <> fb, Just (toRawFilePath file))
- ]
- where
- fb = encodeBS (fromField f)
- formatvalue (f, ComputeValue v) =
- ("v" <> encodeBS (fromField f), Just (encodeBS v))
- formatoutput (f, ComputeOutput key) =
- ("o" <> encodeBS (fromField f),
+ formatparam p = ("p" <> encodeBS p, Nothing)
+ formatinput (file, key) =
+ ("i" <> toRawFilePath file, Just (serializeKey' key))
+ formatoutput (file, (Just key)) = Just $
+ ("o" <> toRawFilePath file,
if key == k
then Nothing
else Just (serializeKey' key)
)
+ formatoutput (_, Nothing) = Nothing
parseComputeState :: Key -> B.ByteString -> Maybe ComputeState
parseComputeState k b =
- let q = parseQuery b
- st = go emptycomputestate (M.fromList q) q
+ let st = go emptycomputestate (parseQuery b)
in if st == emptycomputestate then Nothing else Just st
where
- emptycomputestate = ComputeState mempty mempty mempty
- go c _ [] = c
- go c m ((f, v):rest) =
+ emptycomputestate = ComputeState mempty mempty mempty False
+ go :: ComputeState -> [QueryItem] -> ComputeState
+ go c [] = c { computeParams = reverse (computeParams c) }
+ go c ((f, v):rest) =
let c' = fromMaybe c $ case decodeBS f of
- ('f':f') -> do
- file <- fromRawFilePath <$> v
- kv <- M.lookup (encodeBS ('k':f')) m
- key <- deserializeKey' =<< kv
+ ('p':p) -> Just $ c
+ { computeParams = p : computeParams c
+ }
+ ('i':i) -> do
+ key <- deserializeKey' =<< v
Just $ c
- { computeInputs =
- M.insert (Field f')
- (ComputeInput key file)
+ { computeInputs =
+ M.insert i key
(computeInputs c)
}
- ('v':f') -> do
- val <- decodeBS <$> v
- Just $ c
- { computeValues =
- M.insert (Field f')
- (ComputeValue val)
- (computeValues c)
- }
- ('o':f') -> case v of
+ ('o':o) -> case v of
Just kv -> do
key <- deserializeKey' kv
Just $ c
- { computeOutputs =
- M.insert (Field f')
- (ComputeOutput key)
+ { computeOutputs =
+ M.insert o (Just key)
(computeOutputs c)
}
Nothing -> Just $ c
- { computeOutputs =
- M.insert (Field f')
- (ComputeOutput k)
+ { computeOutputs =
+ M.insert o (Just k)
(computeOutputs c)
}
_ -> Nothing
- in go c' m rest
+ in go c' rest
{- The per remote metadata is used to store ComputeState. This allows
- recording multiple ComputeStates that generate the same key.
Just ts -> go (zip (repeat ts) sts : c) rest
Nothing -> go c rest
-data InterfaceEnv = InterfaceEnv [(String, Either Key String)]
-
-data InterfaceOutputs = InterfaceOutputs (M.Map Field Key)
-
-{- Finds the first compute state that provides everything required by the
- - interface, and returns a list of what should be provided to the program
- - in its environment, and what outputs the program is expected to make.
- -}
-interfaceEnv :: [ComputeState] -> Interface -> Either String (InterfaceEnv, InterfaceOutputs)
-interfaceEnv states interface = go Nothing states
- where
- go (Just firsterr) [] = Left firsterr
- go Nothing [] = interfaceEnv' (ComputeState mempty mempty mempty) interface
- go firsterr (state:rest) = case interfaceEnv' state interface of
- Right v -> Right v
- Left e
- | null rest -> Left (fromMaybe e firsterr)
- | otherwise -> go (firsterr <|> Just e) rest
-
-interfaceEnv' :: ComputeState -> Interface -> Either String (InterfaceEnv, InterfaceOutputs)
-interfaceEnv' state interface@(Interface i) =
- case partitionEithers (mapMaybe go i) of
- ([], r) -> Right
- ( InterfaceEnv (map (\(f, v) -> (fromField f, v)) r)
- , interfaceOutputs state interface
- )
- (problems, _) -> Left $ unlines problems
- where
- go (InterfaceInput field desc) =
- case M.lookup field (computeInputs state) of
- Just (ComputeInput key _file) -> Just $
- Right (field, Left key)
- Nothing -> Just $
- Left $ "Missing required input \"" ++ fromField field ++ "\" -- " ++ desc
- go (InterfaceOptionalInput field _desc) =
- case M.lookup field (computeInputs state) of
- Just (ComputeInput key _file) -> Just $
- Right (field, Left key)
- Nothing -> Nothing
- go (InterfaceValue field desc) =
- case M.lookup field (computeValues state) of
- Just (ComputeValue v) -> Just $
- Right (field, Right v)
- Nothing -> Just $
- Left $ "Missing required value \"" ++ fromField field ++ "\" -- " ++ desc
- go (InterfaceOptionalValue field _desc) =
- case M.lookup field (computeValues state) of
- Just (ComputeValue v) -> Just $
- Right (field, Right v)
- Nothing -> Nothing
- go (InterfaceOutput _ _) = Nothing
- go InterfaceReproducible = Nothing
-
-interfaceOutputs :: ComputeState -> Interface -> InterfaceOutputs
-interfaceOutputs state (Interface interface) =
- InterfaceOutputs $ M.fromList $ mapMaybe go interface
- where
- go (InterfaceOutput field _) = do
- ComputeOutput key <- M.lookup field (computeOutputs state)
- Just (field, key)
- go _ = Nothing
-
-computeProgramEnvironment :: InterfaceEnv -> Annex [(String, String)]
-computeProgramEnvironment (InterfaceEnv ienv) = do
+computeProgramEnvironment :: ComputeState -> Annex [(String, String)]
+computeProgramEnvironment st = do
environ <- filter (caninherit . fst) <$> liftIO getEnvironment
- interfaceenv <- mapM go ienv
- return $ environ ++ interfaceenv
+ let addenv = mapMaybe go (computeParams st)
+ return $ environ ++ addenv
where
envprefix = "ANNEX_COMPUTE_"
caninherit v = not (envprefix `isPrefixOf` v)
- go (f, Right v) = return (envprefix ++ f, v)
- go (f, Left k) =
- ifM (inAnnex k)
- ( do
- objloc <- calcRepo (gitAnnexLocation k)
- return (envprefix ++ f, fromOsPath objloc)
- , giveup "missing an input to the computation"
- )
+ go p
+ | '=' `elem` p =
+ let (f, v) = separate (== '=') p
+ in Just (envprefix ++ f, v)
+ | otherwise = Nothing
+
+newtype ImmutableState = ImmutableState Bool
runComputeProgram
:: ComputeProgram
- -> Key
- -> AssociatedFile
- -> OsPath
- -> MeterUpdate
- -> VerifyConfig
- -> (InterfaceEnv, InterfaceOutputs)
- -> Annex Verification
-runComputeProgram (ComputeProgram program) k _af dest p vc (ienv, InterfaceOutputs iout) = do
- environ <- computeProgramEnvironment ienv
+ -> ComputeState
+ -> ImmutableState
+ -> (OsPath -> Annex (Key, Maybe OsPath))
+ -> (ComputeState -> OsPath -> Annex v)
+ -> Annex v
+runComputeProgram (ComputeProgram program) state (ImmutableState immutablestate) getinputcontent cont =
withOtherTmp $ \tmpdir ->
- go environ tmpdir
+ go tmpdir
`finally` liftIO (removeDirectoryRecursive tmpdir)
where
- go environ tmpdir = do
- let pr = (proc program [])
- { cwd = Just $ fromOsPath tmpdir
+ go tmpdir = do
+ environ <- computeProgramEnvironment state
+ let pr = (proc program (computeParams state))
+ { cwd = Just (fromOsPath tmpdir)
+ , std_in = CreatePipe
, std_out = CreatePipe
, env = Just environ
}
- computing <- liftIO $ withCreateProcess pr $
- processoutput mempty tmpdir
- finish computing tmpdir
+ state' <- bracket
+ (liftIO $ createProcess pr)
+ (liftIO . cleanupProcess)
+ (getinput state tmpdir)
+ cont state' tmpdir
- processoutput computing tmpdir _ (Just h) _ pid =
- hGetLineUntilExitOrEOF pid h >>= \case
+ getinput state' tmpdir p =
+ liftIO (hGetLineUntilExitOrEOF (processHandle p) (stdoutHandle p)) >>= \case
Just l
- | null l -> processoutput computing tmpdir Nothing (Just h) Nothing pid
- | otherwise -> parseoutput computing l >>= \case
- Just computing' ->
- processoutput computing' tmpdir Nothing (Just h) Nothing pid
- Nothing -> do
- hClose h
- ifM (checkSuccessProcess pid)
- ( giveup $ program ++ " output included an unparseable line: \"" ++ l ++ "\""
- , giveup $ program ++ " exited unsuccessfully"
- )
+ | null l -> getinput state' tmpdir p
+ | otherwise -> do
+ state'' <- parseoutput p state' l
+ getinput state'' tmpdir p
Nothing -> do
- hClose h
- unlessM (checkSuccessProcess pid) $
+ liftIO $ hClose (stdoutHandle p)
+ liftIO $ hClose (stdinHandle p)
+ unlessM (liftIO $ checkSuccessProcess (processHandle p)) $
giveup $ program ++ " exited unsuccessfully"
- return computing
- processoutput _ _ _ _ _ _ = error "internal"
+ return state'
- parseoutput computing l = case Proto.parseMessage l of
- Just (Computing field file) ->
- case M.lookup field iout of
- Just key -> do
- when (key == k) $
- -- XXX can start watching the file and updating progess now
- return ()
- return $ Just $
- M.insert key (toRawFilePath file) computing
- Nothing -> return (Just computing)
- Just (Progress percent) -> do
+ parseoutput p state' l = case Proto.parseMessage l of
+ Just (ProcessInput f) ->
+ let knowninput = M.member f (computeInputs state')
+ in checkimmutable knowninput l $ do
+ (k, mp) <- getinputcontent (toOsPath f)
+ liftIO $ hPutStrLn (stdinHandle p) $
+ maybe "" fromOsPath mp
+ return $ if knowninput
+ then state'
+ else state'
+ { computeInputs =
+ M.insert f k
+ (computeInputs state')
+ }
+ Just (ProcessOutput f) ->
+ let knownoutput = M.member f (computeOutputs state')
+ in checkimmutable knownoutput l $
+ return $ if knownoutput
+ then state'
+ else state'
+ { computeOutputs =
+ M.insert f Nothing
+ (computeOutputs state')
+ }
+ Just (ProcessProgress percent) -> do
-- XXX
- return Nothing
- Nothing -> return Nothing
-
- finish computing tmpdir = do
- case M.lookup k computing of
- Nothing -> giveup $ program ++ " exited successfully, but failed to output a filename"
- Just file -> do
- let file' = tmpdir </> file
- unlessM (liftIO $ doesFileExist file') $
- giveup $ program ++ " exited sucessfully, but failed to write the computed file"
- catchNonAsync (liftIO $ moveFile file' dest)
- (\err -> giveup $ "failed to move the computed file: " ++ show err)
+ return state'
+ Just ProcessReproducible ->
+ return $ state' { computeReproducible = True }
+ Nothing -> giveup $
+ program ++ " output included an unparseable line: \"" ++ l ++ "\""
+
+ checkimmutable True _ a = a
+ checkimmutable False l a
+ | not immutablestate = a
+ | otherwise = giveup $
+ program ++ " is not behaving the same way it used to, now outputting: \"" ++ l ++ "\""
+
+computeKey :: RemoteStateHandle -> ComputeProgram -> Key -> AssociatedFile -> OsPath -> MeterUpdate -> VerifyConfig -> Annex Verification
+computeKey rs (ComputeProgram program) k af dest p vc = do
+ states <- map snd . sortOn fst -- least expensive probably
+ <$> getComputeStates rs k
+ case mapMaybe computeskey states of
+ ((keyfile, state):_) -> runComputeProgram
+ (ComputeProgram program)
+ state
+ (ImmutableState True)
+ (getinputcontent state)
+ (go keyfile)
+ [] -> giveup "Missing compute state"
+ where
+ getinputcontent state f =
+ case M.lookup (fromOsPath f) (computeInputs state) of
+ Just inputkey -> do
+ obj <- calcRepo (gitAnnexLocation inputkey)
+ -- XXX get input object when not present
+ return (inputkey, Just obj)
+ Nothing -> error "internal"
+
+ computeskey state =
+ case M.keys $ M.filter (== Just k) (computeOutputs state) of
+ (keyfile : _) -> Just (keyfile, state)
+ [] -> Nothing
+
+ go keyfile state tmpdir = do
+ let keyfile' = tmpdir </> toOsPath keyfile
+ unlessM (liftIO $ doesFileExist keyfile') $
+ giveup $ program ++ " exited sucessfully, but failed to write the computed file"
+ catchNonAsync (liftIO $ moveFile keyfile' dest)
+ (\err -> giveup $ "failed to move the computed file: " ++ show err)
-- Try to move any other computed object files into the annex.
- forM_ (M.toList computing) $ \(key, file) ->
- when (k /= key) $ do
- let file' = tmpdir </> file
- whenM (liftIO $ doesFileExist file') $
- whenM (verifyKeyContentPostRetrieval RetrievalAllKeysSecure vc verification k file') $
- void $ tryNonAsync $ moveAnnex k file'
+ forM_ (M.toList $ computeOutputs state) $ \case
+ (file, (Just key)) ->
+ when (k /= key) $ do
+ let file' = tmpdir </> toOsPath file
+ whenM (liftIO $ doesFileExist file') $
+ whenM (verifyKeyContentPostRetrieval RetrievalAllKeysSecure vc verification k file') $
+ void $ tryNonAsync $ moveAnnex k file'
+ _ -> noop
return verification
-- verification.
verification = MustVerify
-computeKey :: RemoteStateHandle -> ComputeProgram -> TMVar (Maybe Interface) -> Key -> AssociatedFile -> OsPath -> MeterUpdate -> VerifyConfig -> Annex Verification
-computeKey rs program iv k af dest p vc =
- liftIO (getInterfaceCached program iv) >>= \case
- Left err -> giveup err
- Right interface -> do
- states <- map snd . sortOn fst
- <$> getComputeStates rs k
- either giveup (runComputeProgram program k af dest p vc)
- (interfaceEnv states interface)
-
--- Make sure that the compute state has everything needed by
--- the program's current interface.
-checkKey :: RemoteStateHandle -> ComputeProgram -> TMVar (Maybe Interface) -> Key -> Annex Bool
-checkKey rs program iv k = do
- states <- map snd <$> getComputeStates rs k
- liftIO (getInterfaceCached program iv) >>= \case
- Left err -> giveup err
- Right interface ->
- case interfaceEnv states interface of
- Right _ -> return True
- Left _ -> return False
+-- Make sure that the compute state exists.
+checkKey :: RemoteStateHandle -> Key -> Annex Bool
+checkKey rs k = do
+ states <- getComputeStates rs k
+ if null states
+ then giveup "Missing compute state"
+ else return True
-- Unsetting the compute state will prevent computing the key.
dropKey :: RemoteStateHandle -> Maybe SafeDropProof -> Key -> Annex ()